/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.tx;

import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@Category(ParallelStatsDisabledTest.class)
@RunWith(Parameterized.class)
public class TxCheckpointIT extends ParallelStatsDisabledIT {

  private final boolean localIndex;
  private final String tableDDLOptions;

  @BeforeClass
  public static synchronized void forceClearTables() throws Exception {
    // We see this specific test hanging with Heap memory problems.
    // Try to free as much resources as we can before starting
    resetHbase();
    doSetup();
  }

  public TxCheckpointIT(boolean localIndex, boolean mutable, boolean columnEncoded,
    String transactionProvider) {
    StringBuilder optionBuilder = new StringBuilder();
    optionBuilder.append("TRANSACTION_PROVIDER='" + transactionProvider + "'");
    this.localIndex = localIndex;
    if (!columnEncoded) {
      optionBuilder.append(",COLUMN_ENCODED_BYTES=0");
    }
    if (!mutable) {
      optionBuilder.append(",IMMUTABLE_ROWS=true");
      if (!columnEncoded) {
        optionBuilder.append(
          ",IMMUTABLE_STORAGE_SCHEME=" + PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
      }
    }
    this.tableDDLOptions = optionBuilder.toString();
  }

  private static Connection getConnection() throws SQLException {
    return getConnection(PropertiesUtil.deepCopy(TEST_PROPERTIES));
  }

  private static Connection getConnection(Properties props) throws SQLException {
    props.setProperty(QueryServices.DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB, Boolean.toString(true));
    Connection conn = DriverManager.getConnection(getUrl(), props);
    return conn;
  }

  // name is used by failsafe as file name in reports
  @Parameters(
      name = "TxCheckpointIT_localIndex={0},mutable={1},columnEncoded={2},transactionProvider={3}")
  public static synchronized Collection<Object[]> data() {
    return Arrays.asList(new Object[][] {
      // OMID does not support local indexes or column encoding
      { false, false, false, "OMID" }, { false, true, false, "OMID" }, });
  }

  @Test
  public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
    String tableName = "TBL_" + generateUniqueName();
    String indexName = "IDX_" + generateUniqueName();
    String seqName = "SEQ_" + generateUniqueName();
    String fullTableName = SchemaUtil.getTableName(tableName, tableName);
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
    props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
    props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
    Connection conn = getConnection(props);
    conn.setAutoCommit(true);
    conn.createStatement().execute("CREATE SEQUENCE " + seqName);
    conn.createStatement().execute(
      "CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)" + tableDDLOptions);
    conn.createStatement().execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName
      + " ON " + fullTableName + "(val)");

    conn.createStatement()
      .execute("UPSERT INTO " + fullTableName + " VALUES (NEXT VALUE FOR " + seqName + ",1)");
    for (int i = 0; i < 6; i++) {
      Statement stmt = conn.createStatement();
      int upsertCount = stmt.executeUpdate("UPSERT INTO " + fullTableName
        + " SELECT NEXT VALUE FOR " + seqName + ", val FROM " + fullTableName);
      assertEquals((int) Math.pow(2, i), upsertCount);
    }
    conn.close();
  }

  @Test
  public void testRollbackOfUncommittedDeleteSingleCol() throws Exception {
    String tableName = "TBL_" + generateUniqueName();
    String indexName = "IDX_" + generateUniqueName();
    String fullTableName = SchemaUtil.getTableName(tableName, tableName);
    String indexDDL = "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON "
      + fullTableName + " (v1) INCLUDE(v2)";
    testRollbackOfUncommittedDelete(indexDDL, fullTableName);
  }

  @Test
  public void testRollbackOfUncommittedDeleteMultiCol() throws Exception {
    String tableName = "TBL_" + generateUniqueName();
    String indexName = "IDX_" + generateUniqueName();
    String fullTableName = SchemaUtil.getTableName(tableName, tableName);
    String indexDDL = "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON "
      + fullTableName + " (v1, v2)";
    testRollbackOfUncommittedDelete(indexDDL, fullTableName);
  }

  private void testRollbackOfUncommittedDelete(String indexDDL, String fullTableName)
    throws Exception {
    Connection conn = getConnection();
    conn.setAutoCommit(false);
    try {
      Statement stmt = conn.createStatement();
      stmt.execute("CREATE TABLE " + fullTableName
        + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
      stmt.execute(indexDDL);

      stmt.executeUpdate("upsert into " + fullTableName + " values('x1', 'y1', 'a1')");
      stmt.executeUpdate("upsert into " + fullTableName + " values('x2', 'y2', 'a2')");

      // assert values in data table
      ResultSet rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
      assertTrue(rs.next());
      assertEquals("x1", rs.getString(1));
      assertEquals("y1", rs.getString(2));
      assertEquals("a1", rs.getString(3));
      assertTrue(rs.next());
      assertEquals("x2", rs.getString(1));
      assertEquals("y2", rs.getString(2));
      assertEquals("a2", rs.getString(3));
      assertFalse(rs.next());

      // assert values in index table
      rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1");
      assertTrue(rs.next());
      assertEquals("x1", rs.getString(1));
      assertEquals("y1", rs.getString(2));
      assertEquals("a1", rs.getString(3));
      assertTrue(rs.next());
      assertEquals("x2", rs.getString(1));
      assertEquals("y2", rs.getString(2));
      assertEquals("a2", rs.getString(3));
      assertFalse(rs.next());

      conn.commit();

      stmt.executeUpdate("DELETE FROM " + fullTableName + " WHERE k='x1' AND v1='y1' AND v2='a1'");
      // assert row is delete in data table
      rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
      assertTrue(rs.next());
      assertEquals("x2", rs.getString(1));
      assertEquals("y2", rs.getString(2));
      assertEquals("a2", rs.getString(3));
      assertFalse(rs.next());

      // assert row is deleted in data table
      rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
      assertTrue(rs.next());
      assertEquals("x2", rs.getString(1));
      assertEquals("y2", rs.getString(2));
      assertEquals("a2", rs.getString(3));
      assertFalse(rs.next());

      // assert row is deleted in index table
      rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1");
      assertTrue(rs.next());
      assertEquals("x2", rs.getString(1));
      assertEquals("y2", rs.getString(2));
      assertEquals("a2", rs.getString(3));
      assertFalse(rs.next());

      conn.rollback();

      // assert two rows in data table
      rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
      assertTrue(rs.next());
      assertEquals("x1", rs.getString(1));
      assertEquals("y1", rs.getString(2));
      assertEquals("a1", rs.getString(3));
      assertTrue(rs.next());
      assertEquals("x2", rs.getString(1));
      assertEquals("y2", rs.getString(2));
      assertEquals("a2", rs.getString(3));
      assertFalse(rs.next());

      // assert two rows in index table
      rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1");
      assertTrue(rs.next());
      assertEquals("x1", rs.getString(1)); // fails here
      assertEquals("y1", rs.getString(2));
      assertEquals("a1", rs.getString(3));
      assertTrue(rs.next());
      assertEquals("x2", rs.getString(1));
      assertEquals("y2", rs.getString(2));
      assertEquals("a2", rs.getString(3));
      assertFalse(rs.next());
    } finally {
      conn.close();
    }
  }

  @Test
  public void testCheckpointForUpsertSelect() throws Exception {
    String tableName = "TBL_" + generateUniqueName();
    String indexName = "IDX_" + generateUniqueName();
    String fullTableName = SchemaUtil.getTableName(tableName, tableName);
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    try (Connection conn = getConnection()) {
      conn.setAutoCommit(false);
      Statement stmt = conn.createStatement();

      stmt.execute("CREATE TABLE " + fullTableName
        + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
      stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON "
        + fullTableName + " (v1) INCLUDE(v2)");

      stmt.executeUpdate("upsert into " + fullTableName + " values(1, 'a2', 'b1')");
      stmt.executeUpdate("upsert into " + fullTableName + " values(2, 'a2', 'b2')");
      stmt.executeUpdate("upsert into " + fullTableName + " values(3, 'a3', 'b3')");
      conn.commit();

      upsertRows(conn, fullTableName);
      conn.rollback();
      verifyRows(conn, fullTableName, 3);

      upsertRows(conn, fullTableName);
      conn.commit();
      verifyRows(conn, fullTableName, 6);
    }
  }

  private void verifyRows(Connection conn, String fullTableName, int expectedMaxId)
    throws SQLException {
    ResultSet rs;
    // query the data table
    rs = conn.createStatement()
      .executeQuery("select /*+ NO_INDEX */ max(id) from " + fullTableName + "");
    assertTrue(rs.next());
    assertEquals(expectedMaxId, rs.getLong(1));
    assertFalse(rs.next());

    // query the index
    rs = conn.createStatement()
      .executeQuery("select /*+ INDEX(DEMO IDX) */ max(id) from " + fullTableName + "");
    assertTrue(rs.next());
    assertEquals(expectedMaxId, rs.getLong(1));
    assertFalse(rs.next());
  }

  private void upsertRows(Connection conn, String fullTableName) throws SQLException {
    ResultSet rs;
    MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
    conn.createStatement().executeQuery("select 1 from " + fullTableName + " LIMIT 1").next();
    long wp = state.getWritePointer();
    conn.createStatement().execute(
      "upsert into " + fullTableName + " select max(id)+1, 'a4', 'b4' from " + fullTableName + "");
    assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
    rs = conn.createStatement().executeQuery("select max(id) from " + fullTableName + "");

    assertTrue(rs.next());
    assertEquals(4, rs.getLong(1));
    assertFalse(rs.next());

    conn.createStatement().execute(
      "upsert into " + fullTableName + " select max(id)+1, 'a5', 'b5' from " + fullTableName + "");
    assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
    assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
    // moves
    wp = state.getWritePointer();
    rs = conn.createStatement().executeQuery("select max(id) from " + fullTableName + "");

    assertTrue(rs.next());
    assertEquals(5, rs.getLong(1));
    assertFalse(rs.next());

    conn.createStatement().execute(
      "upsert into " + fullTableName + " select max(id)+1, 'a6', 'b6' from " + fullTableName + "");
    assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
    assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
    // moves
    wp = state.getWritePointer();
    rs = conn.createStatement().executeQuery("select max(id) from " + fullTableName + "");

    assertTrue(rs.next());
    assertEquals(6, rs.getLong(1));
    assertFalse(rs.next());
  }

  @Test
  public void testCheckpointForDeleteAndUpsert() throws Exception {
    String tableName = "TBL_" + generateUniqueName();
    String indexName = "IDX_" + generateUniqueName();
    String fullTableName = SchemaUtil.getTableName(tableName, tableName);
    ResultSet rs;
    try (Connection conn = getConnection()) {
      conn.setAutoCommit(false);
      Statement stmt = conn.createStatement();
      stmt.execute("CREATE TABLE " + fullTableName
        + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" + tableDDLOptions);
      stmt.execute("CREATE TABLE " + fullTableName
        + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" + tableDDLOptions);
      stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON "
        + fullTableName + "1 (FK1B)");

      stmt.executeUpdate("upsert into " + fullTableName + "1 values (1, 3, 3)");
      stmt.executeUpdate("upsert into " + fullTableName + "1 values (2, 2, 2)");
      stmt.executeUpdate("upsert into " + fullTableName + "1 values (3, 1, 1)");
      stmt.executeUpdate("upsert into " + fullTableName + "2 values (1, 1)");
      conn.commit();

      MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
      // Start a new transaction
      stmt.executeQuery("select 1 from " + fullTableName + "1 LIMIT 1").next();
      long wp = state.getWritePointer();
      conn.createStatement()
        .execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1");
      assertEquals(PhoenixVisibilityLevel.SNAPSHOT, state.getVisibilityLevel());
      assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move

      rs = conn.createStatement()
        .executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
      assertTrue(rs.next());
      assertEquals(1, rs.getLong(1));
      assertTrue(rs.next());
      assertEquals(3, rs.getLong(1));
      assertFalse(rs.next());

      rs = conn.createStatement()
        .executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
      assertTrue(rs.next());
      assertEquals(3, rs.getLong(1));
      assertTrue(rs.next());
      assertEquals(1, rs.getLong(1));
      assertFalse(rs.next());

      conn.createStatement()
        .execute("delete from " + fullTableName + "1 where id1 in (select fk1a from "
          + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))");
      assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
      assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved

      rs = conn.createStatement()
        .executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
      assertTrue(rs.next());
      assertEquals(1, rs.getLong(1));
      assertFalse(rs.next());

      rs = conn.createStatement()
        .executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
      assertTrue(rs.next());
      assertEquals(1, rs.getLong(1));
      assertFalse(rs.next());

      stmt.executeUpdate(
        "upsert into " + fullTableName + "1 SELECT id1 + 3, id1, id1 FROM " + fullTableName + "1");
      stmt.executeUpdate("upsert into " + fullTableName + "2 values (2, 4)");

      conn.createStatement()
        .execute("delete from " + fullTableName + "1 where id1 in (select fk1a from "
          + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))");
      assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
      assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved

      rs = conn.createStatement()
        .executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
      assertTrue(rs.next());
      assertEquals(4, rs.getLong(1));
      assertFalse(rs.next());

      rs = conn.createStatement()
        .executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
      assertTrue(rs.next());
      assertEquals(4, rs.getLong(1));
      assertFalse(rs.next());

      conn.rollback();

      rs = conn.createStatement()
        .executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
      assertTrue(rs.next());
      assertEquals(1, rs.getLong(1));
      assertTrue(rs.next());
      assertEquals(2, rs.getLong(1));
      assertTrue(rs.next());
      assertEquals(3, rs.getLong(1));
      assertFalse(rs.next());

      rs = conn.createStatement()
        .executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
      assertTrue(rs.next());
      assertEquals(3, rs.getLong(1));
      assertTrue(rs.next());
      assertEquals(2, rs.getLong(1));
      assertTrue(rs.next());
      assertEquals(1, rs.getLong(1));
      assertFalse(rs.next());

      conn.createStatement().execute("drop index " + indexName + " on " + fullTableName + "1");
      conn.createStatement()
        .execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1");
      conn.createStatement()
        .execute("delete from " + fullTableName + "1 where id1 in (select fk1a from "
          + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))");
      assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
      assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved

      rs = conn.createStatement()
        .executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
      assertTrue(rs.next());
      assertEquals(1, rs.getLong(1));
      assertFalse(rs.next());

      rs = conn.createStatement()
        .executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
      assertTrue(rs.next());
      assertEquals(1, rs.getLong(1));
      assertFalse(rs.next());

    }
  }

}
